package com.it.table;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

/**
 * 聚合计算
 *
 * @author code1997
 */
public class AggragateApiDemo {

    public static void main(String[] args) {
        //默认就是stream模式和blink的分析器
        EnvironmentSettings setting = EnvironmentSettings.newInstance().build();
        TableEnvironment tableEnvironment = TableEnvironment.create(setting);
        //直接连接外部连机器的表被称为连接表
        String clickDDL = "CREATE TABLE click_source (" +
                "`user` STRING, " +
                "`url` STRING, " +
                "`ts` BIGINT" +
                ") WITH (" +
                " 'connector' = 'filesystem' ," +
                " 'path' = 'data/chapter01/click.txt' ," +
                " 'format' = 'csv'" +
                ")";
        tableEnvironment.executeSql(clickDDL);

        Table result = tableEnvironment.sqlQuery("select `user`, COUNT(`url`) as cnt from click_source group by `user`");

        String clickDDLToConsole = "CREATE TABLE click_sink_console (" +
                "`user` STRING , " +
                "`cnt` BIGINT  " +
                ") WITH (" +
                " 'connector' = 'print'" +
                ")";
        tableEnvironment.executeSql(clickDDLToConsole);
        result.executeInsert("click_sink_console");

    }

}
